-
Notifications
You must be signed in to change notification settings - Fork 662
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AUTHN-1982] Implement OAuthBearer mechanism for Kafka client to fetch JWT token by communicating with SPIRE agent #1015
base: master
Are you sure you want to change the base?
Conversation
…er and identityPollId as extentions to oauthBearerToken
49fe984
to
87ba39f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this PR @chang-you . I gave it a first-pass review, it's a good example to add.
Apart from the comments on the code, there are a few other things:
-
Is it possible to add a README into the folder (like inside examples/spire_consumer_example/README). It can talk about:
a. how custom OAuth handlers are to be implemented,
b. how to run this example locally.
c. make it more clear that the client doesn't provide support for SPIRE/SPIFFE, but rather, that this is supposed to be an example implementation of a custom OAuth handler.
I can help with writing this README if you prefer, especially part a. and c. -
Is there an open guide on how to set up a SPIRE server locally? As the linked guide is inside Confluent's internal page, and external users of the library can't access it.
-
Add the binary to the examples/.gitignore and the example link to examples/README. Similar to other examples. And add an entry into the CHANGELOG.md, too.
// But in this example we choose to terminate | ||
// the application if all brokers are down. | ||
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) | ||
if e.Code() == kafka.ErrAllBrokersDown { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This if block isn't needed, you can remove the comment too, and replace with just
// Errors should generally be considered
// informational, the client will try to
// automatically recover.
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
"group.id": "myGroup", | ||
"session.timeout.ms": 6000, | ||
"auto.offset.reset": "earliest", | ||
"enable.auto.offset.store": false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this config, as it's not relevant to this example and will complicate things
if e.Headers != nil { | ||
fmt.Printf("%% Headers: %v\n", e.Headers) | ||
} | ||
_, err := c.StoreMessage(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the StoreMessage and the subsequent error check, as it's not needed after removing "enable.auto.offset.store"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
regarding this file: Would it make sense to keep just one file (either the consumer or the producer, but not both)?
Since this is an example to demonstrate the usage of a custom oauth handler, what we're actually doing with the client shouldn't be the main focus of the example, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comments! That makes sense too.
@arvindth what do you think? Thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, a single producer or consumer example is probably good enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
} | ||
|
||
func retrieveJWTToken(ctx context.Context, principal, socketPath string, audience []string) (kafka.OAuthBearerToken, func() error, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment for this method.
We don't need to describe the exact logic that the method uses (in fact, it's probably a fair idea to say that we can use any arbitrary logic, like here we are using the go-spiffe library as long as we construct a kafka.OAuthBearerToken
token correctly).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comments. To clarify:
We don't need to describe the exact logic that the method use
Here do you suggest that we don't need to include a description of this method in the readme, or is there something specific I should modify within this method?
"time" | ||
) | ||
|
||
// handleJWTTokenRefreshEvent retrieves JWT from the SPIRE workload API and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// handleJWTTokenRefreshEvent retrieves JWT from the SPIRE workload API and | |
// handleJWTTokenRefreshEvent retrieves JWT from the SPIFFE workload API and |
for correct terminology.
"github.com/confluentinc/confluent-kafka-go/v2/kafka" | ||
) | ||
|
||
// handleJWTTokenRefreshEvent retrieves JWT from the SPIRE workload API and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// handleJWTTokenRefreshEvent retrieves JWT from the SPIRE workload API and | |
// handleJWTTokenRefreshEvent retrieves JWT from the SPIFFE workload API and |
@chang-you what's the status of this PR? |
// It must be invoked whenever kafka.OAuthBearerTokenRefresh appears on the client's event channel, | ||
// which will occur whenever the client requires a token (i.e. when it first starts and when the | ||
// previously-received token is 80% of the way to its expiration time). | ||
func handleJWTTokenRefreshEvent(ctx context.Context, client kafka.Handle, principal, socketPath string, audience []string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chang-you I don't think we're using the principal here correctly. In the oauth producer case, the principal needs to be passed in because it's the principal for which we get a token, and the token would then contain the principal in the sub
claim.
For spire though, the principal value should be a spiffe id, and we should be using this spiffe id to look through all the svids returned by the spire agent to see if one of them matches the given principal, and use that one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@arvindth Do you suggest we should use FetchJWTSVIDs instead of FetchJWTSVID and use the JWTSVID with its spiffeid.ID
that match the sub
?
"bootstrap.servers": bootstrapServers, | ||
"security.protocol": "SASL_SSL", | ||
"sasl.mechanisms": "OAUTHBEARER", | ||
"sasl.oauthbearer.config": principal, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this parameter is even required. Could we remove it and see if it works as expected?
Extensions: extensions, | ||
} | ||
|
||
return oauthBearerToken, jwtSource.Close, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't seem necessary to return jwtSource.Close
, as defer jwtSource.Close()
is called above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
What is the purpose of the change
The goal of these changes is to write a test that integrates the Kafka client with a SPIRE server that runs in your local machine to fetch JWT token via the workload API.
handleJWTTokenRefreshEvent
function to handle token refresh eventsretrieveJWTToken
function to fetch JWT from SPIRE agentReferences
What will this work accomplish/help with?
This example test will serve as a guide for integrating Confluent’s Kafka Golang client with SPIRE to mint SPIFFE SVID’s in the form of JWT’s, this will be the client-side changes needed, we will have a separate ticket to implement the server-side changes where Kafka will be able to validate these credentials with SPIRE via the SPIRE agent/the SPIFFE workload API.
Verify the change
The test should run successfully against an actual Kafka cluster in that the followings are verified:
1- Authentication with a Kafka cluster works successfully.
2- Token refresh events are handled properly by fetching a new token every time the refresh event is fired.
Producer + Consumer:
Documentation
To run the SPIRE server on kubernetes, either remote or on minikube, follow this guide